package cn.rdtimes.wolfdsp.sdk;

import cn.rdtimes.wolfdsp.sdk.exception.NonMasterException;
import cn.rdtimes.wolfdsp.sdk.util.HttpClientUtil;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;

/**
 * 调度任务,都将继承此类实现调度逻辑
 *
 * @author BZ
 */
public abstract class Task implements Runnable {
    /**
     * 节点汇报任务进度请求url,由客户端向服务端发起
     */
    private final static String SERVER_REPORT_URL = "/dsp/task/report";

    private final TaskStartMeta taskMeta;

    private Future<?> future;
    protected volatile TaskState state = TaskState.RUNNING;
    protected volatile int curProgress;
    protected volatile String exceptionInfo;
    protected volatile Map<String, Object> outputParams = new HashMap<>();

    protected Task(TaskStartMeta taskMeta) {
        this.taskMeta = taskMeta;
    }

    @Override
    public void run() {
        Throwable ex = null;
        try {
            doProgress();

            state = TaskState.FINISHED;
            curProgress = 100;
        } catch (Exception e) {
            ex = e;
        } finally {
            if (ex != null) {
                state = TaskState.FAILED;
                curProgress = 100;
                exceptionInfo = ex.getMessage();
                outputParams = null;
                if (ex instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
            }

            // 汇报结果
            try {
                sentReportState(taskMeta.getJobId(), taskMeta.getTaskId(), state, exceptionInfo, curProgress,
                        outputParams);
            } catch (Exception e) {
                e.printStackTrace();
            }

            TaskService.getInstance().getCompletedListener().onCompleted(taskMeta.getJobId(), taskMeta.getTaskId(),
                    state, ex);
        }
    }

    protected void sentReportState(String jobId, String taskId, TaskState state,
                                   String exceptionInfo, Integer curProgress,
                                   Map<String, Object> outputParams) throws Exception {
        try {
            HttpClientUtil.reportState(getServerUrl(true), jobId, taskId, state,
                    exceptionInfo, curProgress, outputParams);
        } catch (NonMasterException me) {
            HttpClientUtil.reportState(getServerUrl(false), jobId, taskId, state,
                    exceptionInfo, curProgress, outputParams);
        }
    }

    private String getServerUrl(boolean isMaster) {
        if (isMaster)
            return "http://" + taskMeta.getMasterIp() + ":" + taskMeta.getMasterPort() + SERVER_REPORT_URL;
        else
            return "http://" + taskMeta.getStandbyIp() + ":" + taskMeta.getStandbyPort() + SERVER_REPORT_URL;
    }

    /**
     * 由于类实现其业务处理逻辑,包括完成时要处理输出参数(如果有的话)
     * 1. 可以考虑实时汇报进度和状态给服务器,调用sentReportState方法
     * 2. 注意无需考虑状态的变更
     *
     * @throws Exception 异常
     */
    protected abstract void doProgress() throws Exception;


    final synchronized int incAndGetProgress(int value) {
        curProgress += value;
        return curProgress;
    }

    final Future<?> getFuture() {
        return future;
    }

    public void setFuture(Future<?> future) {
        this.future = future;
    }

    public TaskState getState() {
        return state;
    }

    public String getExceptionInfo() {
        return exceptionInfo;
    }

    public Map<String, Object> getOutputParams() {
        return outputParams;
    }

}
